# Guided Hunting - Use Machine Learning to Detect Potential Low and Slow Password Sprays using Apache Spark via Azure Synapse

__Notebook Version:__ 1.0<br>
__Python Version:__ Python 3.8<br>
__Required Packages:__ azureml-synapse, msticpy, azure-storage-file-datalake <br>
__Platforms Supported:__  Azure Machine Learning Notebooks connected to Azure Synapse Workspace
     
__Data Source Required:__ Yes

__Data Source:__ SigninLogs

__Spark Version:__ 3.1 or above
    
## Description
This guided hunting notebook leverages machine learning to tackle the difficult problem of detecting low and slow password spray campaigns (This augments more broad-scoped password spray detection already provided via Microsoft’s Identity Protection Integration for Sentinel.)
We leverage the built-in parallelism of PySpark and MLlib (via the Azure Synapse linked service) to ingest, query and analyse data at scale.

Low and slow sprays are a variant on traditional password spray attacks that are being increasingly used by sophisticated adversaries.
These adversaries can randomize client fields between each sign in attempt, including IP addresses, user agents and client application and are often willing to let the password spray campaigns run at a very low frequency over a period of months or years, making detection very challenging.  
A key observation that we exploit in this noteboo is the fact that, within a single campaign, attackers often randomize the same large number of properties simultaneously, resulting in a group of logins occurring periodically over a long period of time with same set of anomalous properties.

This notebook runs through the following ML-driven approach to surfacing potential low and slow sprays. (For more details on the approach see the accompanying Microsoft Tech Community blog post: [Microsoft Sentinel Blog - Microsoft Tech Community](https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/bg-p/MicrosoftSentinelBlog).)

1.	**Detect anomalous fields for each failed sign-in** attempt using successful sign-ins as a baseline
2.	**Use ML to cluster failed sign-ins** by the columns which were randomized/anomalous
3.	**Prune the clusters** from the previous step based on knowledge of what a low and slow spray looks like; for example, by removing clusters in which sign-ins do not occur at a steady frequency over an extended period of time
4.	**Further analyze the candidate password spray clusters** (using threat intelligence enrichments from msticpy, for example), to find any invariant properties within the clusters
5.	**Identify any successful sign-ins that follow the patterns** observed for each cluster from the previous step and create Sentinel incidents as appropriate

**Related MITRE ATT&CK techniques:**
- [T1110: Brute Force](https://attack.mitre.org/techniques/T1110/)
   - [T1110.003: Password Spraying](https://attack.mitre.org/techniques/T1110/003/)
   - [T1110.004: Credential Stuffing](https://attack.mitre.org/techniques/T1110/004/)
- [T1078: Valid Accounts](https://attack.mitre.org/techniques/T1078/)
   - [T1078.004: Cloud Accounts](https://attack.mitre.org/techniques/T1078/004/)
   
## Pre-Requisites

1. This notebook also makes use of the Azure Synapse integration for Sentinel notebooks. To set up the Synapse integration, please use the notebook [Configurate Azure ML and Azure Synapse Analytics](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/Configurate%20Azure%20ML%20and%20Azure%20Synapse%20Analytics.ipynb).
2. Ensure that the `bayespy ~= 0.5.22` Python pacakge is installed on your Spark pool. You can do this by uploading a `requirements.txt` file as detailed in the [docs](https://docs.microsoft.com/azure/synapse-analytics/spark/apache-spark-manage-python-packages#pool-libraries).
3. Ensure that Sentinel SigninLogs data has been exported to an appropriate ADLS storage container. To export the necessary data
   - Set up a continuous log export rule
   - Do a one-time export of historical data$^*$  

$^*$: A walkthrough of the one-time export of historical log data is available in a TechCommunity blog post here: [Export Historical Data from Log Analytics (microsoft.com)](https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/export-historical-log-data-from-microsoft-sentinel/ba-p/3413418).
<br>
The template notebook is available via the Sentinel UI or on GitHub: [Export Historical Log Data (GitHub)](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/scenario-notebooks/Export%20Historical%20Log%20Data.ipynb).

**Python modules may need to be downloaded.**  
**Please run the cells sequentially to avoid errors.  Please do not use "run all cells".**

## Table of Contents
1. Warm-up
2. Authentication to Azure Resources
3. Configure Azure ML and Azure Synapse Analytics
4. Load the Data
5. Data Cleansing using Spark
6. Data Science using Spark
6. Enriching the Results
7. Conclusion



# 1. Setup

## Install Packages
> **Note**: Install below packages only for the first time and restart the kernel once done.

In [None]:
# Install AzureML Synapse package to use spark magics
import sys
!{sys.executable} -m pip install --upgrade azureml-synapse

In [None]:
# Install Azure storage datalake library to manipulate file systems
import sys
!{sys.executable} -m pip install --upgrade azure-storage-file-datalake --pre

In [None]:
# Install msticpy for enhanced security data analysis
import sys
!{sys.executable} -m pip install --upgrade msticpy[azure]

*** $\color{red}{Note:~After~installing~the~packages,~please~restart~the~kernel.}$ ***

## Initialize `msticpy`

The `nbinit` module loads required libraries and optionally installs required packages.

In [None]:
# Load Python libraries that will be used in the non-Synapse portion of this notebook
from datetime import timedelta, datetime
import importlib
import json

from azureml.core import Workspace, LinkedService, SynapseWorkspaceLinkedServiceConfiguration, Datastore

from azure.storage.filedatalake import DataLakeServiceClient
from azure.core._match_conditions import MatchConditions
from azureml.core.compute import ComputeTarget, SynapseCompute
from azure.storage.filedatalake._models import ContentSettings
from msticpy.nbtools import nbinit

import pandas as pd
from IPython import get_ipython
from IPython.display import display, HTML
from ipywidgets import Dropdown, HBox, IntSlider, Label, Layout
from scipy.stats import ks_1samp, ks_2samp
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)


REQ_PYTHON_VER = "3.10"
REQ_MSTICPY_VER = "2.12.0"

display(HTML("<h3>Starting Notebook setup...</h3>"))
nbinit.init_notebook(namespace=globals())


WIDGET_DEFAULTS = {
    "layout": Layout(width="95%"),
    "style": {"description_width": "initial"},
}

#Set pandas options
pd.set_option('display.max_rows', 10)
pd.set_option('max_colwidth', 50)

## Configure Azure ML and Azure Synapse Analytics

If you haven't previously set up the Synapse linked service for AzureML, please use the notebook, [Configurate Azure ML and Azure Synapse Analytics](https://github.com/Azure/Azure-Sentinel-Notebooks/blob/master/Configurate%20Azure%20ML%20and%20Azure%20Synapse%20Analytics.ipynb), to do so. The notebook will configure an existing Azure Synapse workspace to create and connect to Spark pool. You can then create linked service and connect the AML workspace to the Azure Synapse workspace.<br>

You will also need to ensure that the `bayespy ~= 0.5.22` Python pacakge is installed on your Spark pool. You can do this by uploading a `requirements.txt` file as detailed in the [docs](https://docs.microsoft.com/azure/synapse-analytics/spark/apache-spark-manage-python-packages#pool-libraries).

### Authentication to Azure Resources

We now connect the AML workspace to the Azure Synapse workspace using the linked service.

> **Note**: Specify the input parameters in below step in order to connect to the Spark attached compute.

In [None]:
amlworkspace = '<aml workspace name>'  # fill in your AML workspace name
subscription_id = '<subscription id>' # fill in your subscription id
resource_group = '<resource group of AML workspace>' # fill in your resource groups for AML workspace
linkedservice = '<linked service name>' # fill in your linked service created to connect to Synapse workspace

In [None]:
# Get the aml workspace
aml_workspace = Workspace.get(name=amlworkspace, subscription_id=subscription_id, resource_group=resource_group)

# Retrieve a known linked service
linked_service = LinkedService.get(aml_workspace, linkedservice)

### Start Spark Session
Enter your Synapse Spark compute below. To view details of available Spark computes in the AML UI, please follow these steps:  </br>
1. On the AML Studio left menu, navigate to **Linked Services**  </br>
2. Click on the name of the Link Service you want to use </br>
3. Select **Spark pools** tab </br>

> **Note:** The Python contexts for the AML notebooks session and the Spark session are separate - this means that Python all variables defined using the `%%synapse` cell magic are not available in the AML notebook session and vice-versa.

In [None]:
available_spark_compute_targets = [compute.name for compute in ComputeTarget.list(aml_workspace) if compute.type == 'SynapseSpark']
synapse_spark_compute_dd = Dropdown(options=available_spark_compute_targets)
HBox([Label('Choose Synapse Spark compute:'), synapse_spark_compute_dd])

In order to work with months or years of data in an efficient, scalable way, we make use of Spark's native multi-executor paralellism. 
The code in this notebook will scale to any number of nodes, though the optimal performance-vs-cost balance will depend on the volume of your data - 10 executors may be a reasonable starting point. (See [pricing details](https://azure.microsoft.com/pricing/details/synapse-analytics/).)

> **Note:** Make sure you have selected you Synapse Spark compute from the drop down in the pervious cell _before_ running the cell below

In [None]:
synapse_spark_compute = synapse_spark_compute_dd.value
compute = SynapseCompute(aml_workspace, synapse_spark_compute)
num_executors_slider = IntSlider(
    value=compute.min_node_count,
    min=compute.min_node_count,
    max=compute.max_node_count - 2,
    step=1,
)
HBox([Label('Maximum number of executors for Spark session:'), num_executors_slider])

Now we start the Spark session with the configuration options selected above.

> **Note:** You can also use the Synapse line/cell magic to start a session if you do not need to expand variables in your spark configuration - e.g.  
`%synapse start -s $subscription_id -w $amlworkspace -r $resource_group -c $synapse_spark_compute`  
More details are here: [RemoteSynapseMagics class - Azure Machine Learning Python | Microsoft Docs](https://docs.microsoft.com/python/api/azureml-synapse/azureml.synapse.magics.remotesynapsemagics(class)?view=azure-ml-py#azureml-synapse-magics-remotesynapsemagics-synapse)

In [None]:
# Start Spark session
spark_config = {
    "spark.dynamicAllocation.enabled": "true",
    "spark.dynamicAllocation.maxExecutors": num_executors_slider.value,
    "spark.dynamicAllocation.minExecutors": compute.min_node_count,
    "spark.shuffle.service.enabled": "true",
}
spark_config_json = json.dumps(spark_config)
get_ipython().run_cell_magic(
    'synapse',
    'start -s $subscription_id -w $amlworkspace -r $resource_group -c $synapse_spark_compute',
    spark_config_json
)

# 2. Run ML on Azure Synapse Spark

## Overview of ML Approach

Our novel ML approach begins with the observation that attackers often randomize the same large number of properties simultaneously, resulting in a group of logins occurring periodically over a long period of time with same set of anomalous properties. 
Thus, we can attempt to cluster failed sign-ins (most password spray sign-in attempts will fail!) based on the set of properties that are anomalous.<br>

We use a naive Bayes approach to estimate the likelihood of any given peroperty value ocurring for a legitimate sign-in and then use outlier detection to highlight unlikely values as being "anomalous". This gives a dataset in which the rows comprise a (failed) sign-in ID and boolean flags for each sign-in property denoting whether or not that property took an anomalous value. 
We model this scenario as a multivariate Bernoulli mixture model, and perform [variational Bayesian inference](https://en.wikipedia.org/wiki/Variational_Bayesian_methods) to detect the presence of latent classes which will be our candidates for low and slow password spray campaigns. 
Later, we filter these candidate low and slow clusters by computing various statistics (such as the uniformity of the time-distribution of the sign-ins) and comparing these against what we would expect from a low and slow password spray.

For more details, see the accompanying Microsoft Tech Community blog post: [Microsoft Sentinel Blog - Microsoft Tech Community](https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/bg-p/MicrosoftSentinelBlog).)

The overall approach looks like this:

1.	**Detect anomalous fields for each failed sign-in** attempt using successful sign-ins as a baseline
2.	**Cluster failed sign-ins** by the columns which were randomized/anomalous
3.	**Prune the clusters** from the previous step based on knowledge of what a low and slow spray looks like; for example, by removing clusters in which sign-ins do not occur at a steady frequency over an extended period of time
4.	**Further analyze the candidate password spray clusters** (using threat intelligence enrichments from msticpy, for example), to find any invariant properties within the clusters
5.	**Identify any successful sign-ins that follow the patterns** observed for each cluster from the previous step and create Sentinel incidents as appropriate

Having started the Spark session, we can run PySpark code by starting a cell with the `%%synapse` line magic.  
Spark and MLlib are written with efficient parallelisation in mind, meaning that data ETL, analysis and ML is hugely distributed by default, allowing for highly scalable workloads.

**SPARK and MLlib References:** 

- [User Guide — PySpark 3.2.1 documentation (apache.org)](https://spark.apache.org/docs/latest/api/python/user_guide/index.html)
- [Spark SQL — PySpark 3.2.1 documentation (apache.org)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html)
- [MLlib (DataFrame-based) — PySpark 3.2.1 documentation (apache.org)](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)

We start by importing the packages we will need for the ML into the current session.

> **Note:** The Python contexts for the AML notebooks session and the Spark session are separate - this means that Python packages imported using the `%%synapse` cell magic are not imported into the AML notebook session and vice-versa.

In [None]:
%%synapse

# Import packages used for the Spark portion of this notebook
from datetime import timedelta, datetime, date
from itertools import chain

from pyspark.ml.feature import IndexToString, OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.stat import Summarizer
from pyspark.sql import functions as F
from pyspark.sql.functions import col, lit, pandas_udf
from pyspark.sql.types import *
from pyspark.sql.window import Window

# NOTE: As per the notebook pre-requisites, ensure that you have `bayespy` installed on your Spark compute!
from bayespy.inference import VB
from bayespy.nodes import Bernoulli, Beta, Categorical, Dirichlet, Mixture
import bayespy.plot as bpplt
from bayespy.utils import random
import pandas as pd
import numpy as np
from scipy.stats import entropy

## Load Data

Fill in the location details for the ADLS container to which the Sentinel SigninLogs are exported. 

We also specify how much data we want to use with the ML algorithm by specifying an end date and a number of lookback days. Keep in mind that low and slow password sprays take place over long periods (typically months or even years).<br>
You will also need to ensure that sufficient historical log data is actually available in ADLS. 

In [None]:
%%synapse

# Primary storage info
account_name = '<storage account name>' # fill in your primary account name
container_name = '<container name>' # fill in your container name
subscription_id = 'd1d8779d-38d7-4f06-91db-9cbc8de0176f' #'<subscription id>' # fill in your subscription id
resource_group = '<resource group>' # fill in your resource groups for ADLS
workspace_name = '<Microsoft sentinel/log analytics workspace name>' # fill in your workspace name

# Datetime and lookback parameters
end_date = "<enter date in the format yyyy-MM-dd e.g. 2021-09-17 or datetime.today().strftime('%Y-%m-%d')>"  # fill in your end date
lookback_days = 240  # how many days prior to the end date to include; make sure you have historical data available in ADLS

The information from the above cell is used to detemine the ALDS paths for the data we want to load (based on the partition scheme used by the "continuous data export" tool in Sentinel).

In [None]:
%%synapse

# Compiling ADLS paths from date strings
def generate_adls_paths(end_date_str: str, lookback_days: int, adls_path: str):
    end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
    days = [end_date - timedelta(days=i) for i in range(lookback_days + 1)]

    pathlist = []
    for day in days:
        date_str = day.strftime('%Y-%m-%d').split('-')
        day_path = adls_path + f'/y={date_str[0]}/m={date_str[1]}/d={date_str[2]}'
        pathlist.append(day_path)

    return pathlist

# This is the root directory to which data from the Sentinel continuous data export tool is written
adls_path = (
    f'abfss://{container_name}@{account_name}.dfs.core.windows.net/WorkspaceResourceId=/'
    f'subscriptions/{subscription_id}/'
    f'resourcegroups/{resource_group.lower()}/'
    f'providers/microsoft.operationalinsights/'
    f'workspaces/{workspace_name.lower()}'
)
# This gives a list of ADLS paths from which we want Spark to read (recursively)
per_day_log_paths = generate_adls_paths(end_date, lookback_days, adls_path)

Now we can read the data into a Spark dataframe. It is worth noting that, since the exported log data comprises a separate data file for each 5-minute partition, we may be reading from over 100,000 files. 
Therefore, you may wish to increase the maximum number of executors available to the Azure Synapse Spark session - this will allow this operation to be massively parallelized automatically, dramatically reducing time taken.

### Feature Selection

Here, we also specify the columns that we want to read into the Spark dataframe. The list suggested below comprises some core sign in properties - `Id`, `UserPrincipalName`, `ResultType`, `TimeGenerated` - and some additional properties (which we refer to as "features" for the ML).  
The features below have been selected to help spot behaviors that make password sprays stand out, e.g.

- Features (properties) that an attacker is able to randomise (e.g. IP addresses, location details, user agent-derived fields)
- Features (properties) where the "normal" values are concealed from attackers (so are hard for an attacker to guess) (e.g. operating system (included in DeviceDetail), browser, city)

_(Some features fall into both categories)_

In [None]:
%%synapse

# Specify the columns to select
column_list = [
    "Id",
    "UserPrincipalName",
    "ResultType",
    "TimeGenerated",
    "AppDisplayName",
    "ClientAppUsed",
    "DeviceDetail",
    "IPAddress",
    "Location",
    "LocationDetails",
    "IsInteractive",
]

# Read the data from ADLS into a Spark dataframe, selecting the columns specified above
try:
    df = spark.read.json(per_day_log_paths, recursiveFileLookup=True)

    # AutonomousSystemNumber is a new field and may not yet be available in all logs
    # if available, this is preferred for use in analysis over IPAddress
    if "AutonomousSystemNumber" in df.columns:
        column_list += ["AutonomousSystemNumber"]

    df = df.select(*column_list)

    #Display the count of records
    print(f"\n No. of records loaded from the past {lookback_days} days: {df.count()}")

except Exception as e:
    # If you see "path doesn't exist" errors, it may be because the lookback_days parameter is set further back than the amount of historical data available
    print(f"Could not load data due to error:\n\t {e}")

## Data Wrangling using Spark

### Filtering data

We start by filtering the data set by result types, keeping result types: 
- 0 (successful sign in)
- 50055 (expired password)
- 50126 (incorrect username or password)

The latter two failure errors are the ones most commonly observed as part of password sprays.  

_See [Azure AD Authentication and authorization error codes](https://docs.microsoft.com/azure/active-directory/develop/reference-aadsts-error-codes) for more details._

In [None]:
%%synapse

# We focus on failed login types 50055 and 50126 as these are the ones most commonly observed in password sprays
df = df.filter(col("ResultType").isin(["0", "50055", "50126"]))
print(f"Row count after filtering: {df.count()}")

### Deduplication 

Exported logs may occasionally contain a small amount of duplication either due to the way in which they are collected or due to the data export process (see [data completeness for exported logs](https://docs.microsoft.com/azure/active-directory/develop/reference-aadsts-error-codes)).  
In general, duplicate rows should be removed prior to analysis, but in some cases, you may decide to postpone or omit de-duplication if duplicated rows are unlikely to impact your detection logic (especially as de-duping can be a very expensive operation depending on the size of your dataframe and the number of columns that comprise a unique key).

In [None]:
%%synapse

print(f'# Rows before de-duplication: {df.count()}')
df = df.dropDuplicates(subset=['Id'])
print(f'# Rows after de-duplication: {df.count()}')

### Data Parsing and Extration

In this step, we
- Create a new column containing the IP prefix (if IP ASN is available, prefer to use this instead)
- Extract the "browser", "displayName" and "operatingSystem" fields from the "DeviceDetail" JSON column
- Extract the "city", "state", "longitude" and "latitude" fields from the "LocationDetails" JSON column

In [None]:
%%synapse

@pandas_udf(StringType())
def ip_prefix(s: pd.Series) -> pd.Series:
    """Given a series of IP address strings, return the first three octets of each IP as a series of strings"""
    return s.apply(lambda ip: '.'.join(ip.split('.')[:3]))

# Create a new calculated column, "IPPrefix", with the first three octets of the IP address
df = df.withColumn('IPPrefix', ip_prefix(col('IPAddress')))

# Fields to extract from the "DeviceDetail" and "LocationDetails" JSON columns
device_detail_cols = ['browser', 'displayName', 'operatingSystem']
location_details_cols = ['city', 'state', 'geoCoordinates']

# Parse the JSON columns and extract the specified fields as new columns
df = (
    df
    .select('*', F.json_tuple(col('DeviceDetail'), *device_detail_cols).alias(*device_detail_cols))
    .select('*', F.json_tuple(col('LocationDetails'), *location_details_cols).alias(*location_details_cols))
    .select('*', F.json_tuple(col('geoCoordinates'), 'latitude', 'longitude').alias('latitude', 'longitude'))
    .drop('DeviceDetail', 'LocationDetails', 'geoCoordinates')
)
df.show()

## Feature Encoding

We now [one-hot encode](https://en.wikipedia.org/wiki/One-hot#Machine_learning_and_statistics) our categorical features using Spark's MLlib 
(stringing together the [`StringIndexer`](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.StringIndexer.html#pyspark.ml.feature.StringIndexer) transform followed by the [`OneHotEncoder`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html) transform).


First we use the [`StringIndexer`](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.StringIndexer.html#pyspark.ml.feature.StringIndexer) class to map the categorical feature columns 
to columns of category indices. For each column, the indices run from 0 to the number of distinct values observed.

In [None]:
%%synapse

# The columns to be ordinal-encoded (replace IPPrefix with AutonomousSystemNumber if it is available in your data!)
feat_cols = ["AppDisplayName", "ClientAppUsed", "browser", "displayName", "operatingSystem", "IPPrefix", "Location", "state", "city"]  # We could include sign-in time (bucketed into hours) here as well
# The output column names for the ordinal-encoded data
ord_enc_cols = ["OrdEnc_" + col for col in feat_cols]

# To missing data is encoded as its own category, we replace empty strings and nulls with "NODATA"
df = df.replace("", None, subset=feat_cols).fillna("NODATA", subset=feat_cols)

# Instantiate, fit, then transform
ordinal_encoder = StringIndexer(inputCols=feat_cols, outputCols=ord_enc_cols, handleInvalid="keep", stringOrderType="alphabetAsc")
ordinal_encoder = ordinal_encoder.fit(df)
encoded_df = ordinal_encoder.transform(df).drop(*feat_cols)

# Create a list of `IndexToString` objects which can be used to convert category indices back to the original values
ordinal_decoders = [IndexToString(inputCol=in_col, outputCol=out_col, labels=labels) for (in_col, out_col, labels) in zip(feat_cols, ord_enc_cols, ordinal_encoder.labelsArray)]

encoded_df.show()

At this stage, we also split our dataframe into two: one conatining successful sign-ins and the other containing failed sign-ins. Doing this here will be helpful later on.

In [None]:
%%synapse

success_df = encoded_df.filter(col("ResultType") == "0")  # Result type 0 indicates a successful login
fail_df = encoded_df.filter(col("ResultType").isin(["50055", "50126"]))  # We focus on failed login types 50055 and 50126 as these are the ones most commonly observed in password sprays
print("# Successful logins:", n_successful:=success_df.count())
print("# Failed logins:", n_failed:=fail_df.count())

Finally, we use [`OneHotEncoder`](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.feature.OneHotEncoder.html) class to convert our ordinal-encoded columns of category indices to one-hot binary vectors.

In [None]:
%%synapse

# The output column names for the one-hot encoded columns
ohe_cols = ["OHE_" + col for col in feat_cols]

# Instantiate, fit, then transform
ohe = OneHotEncoder(inputCols=ord_enc_cols, outputCols=ohe_cols, dropLast=True)
ohe = ohe.fit(success_df)
ohe_df = ohe.transform(success_df).drop(*ord_enc_cols)

ohe_df.show()

## Detect Anomalous Fields for Each Failed Sign-In

The first step is to apply anomaly detection to each column of each failed sign in - we want to end up with a table that looks like this:

| Sign-in ID | Is _Country_ anomalous? | Is _City_ anomalous? | Is _OS_ Anomalous? | Is _Browser_ anomalous? | Is _App Display Name_ anomalous? | etc. |
|------------|-------------------------|----------------------|--------------------|-------------------------|----------------------------------|------|
| 1          | **True**                | **True**             | False              | False                   | False                            | ...  |
| 2          | False                   | False                | False              | **True**                | **True**                         | ...  |
| 3          | False                   | **True**             | **True**           | False                   | False                            | ...  |


We model each of our features as categorical random variables with the categories being the set of unique values observed from all sign in attempts (both successful and failed). We then use Bayesian parameter estimation with the set of successful sign ins to learn the true distributions for "good” (i.e. non-malicious) sign-in attempts. 
(Since we obviously don’t have perfect good vs. malicious labels for all sign-ins, we are using successful sign-ins as a proxy for good sign-ins).

Mathematically, we model the features as independent categorical variables with symmetric Dirichlet priors with concentration parameter, $\alpha$. This leads us to estimate the probability of feature $i$ taking value $c$ as
<br><br>
$$
\hat{p}{i,c} = \frac{N_{i, c} + \alpha}{N + \alpha K_i}
$$

where $N_{i,c}$ is the number of times that feature $i$ takes the value $c$ in the dataset of successful sign-ins, $N$ is the total number of successful sign-ins, and $K_i$ is the number of available categories of feature $i$ (as observed from both successful and failed sign-ins). <br>
Here, $\alpha$ acts as a _smoothing parameter_ (see [Additive/Laplace smoothing](https://en.wikipedia.org/wiki/Additive_smoothing)) - increasing $\alpha$ will cause the algorithm to classify fewer values as being anomalous 
(in particular values which haven't been observed in successful sign-ins are less likely to be classed as anomalous).

### Setting an Anomaly Threshold on Probabilities

When determining whether values are anomalous, we can't just set a static threshold on the estimated probabilities (i.e. if the likelihood of a value is less than $p$, class it as an anomaly) - what constitues a good threshold will depend on the distribution of the observed values for that feature. 
For example, suppose we observe 20 different cities (derived from GeoIP data), and we 95% of successful sign-ins are from city 1, 4% are from city 2, and 1% are from cities 3 - 20. Then, we would probably want to class cities 2 - 20 as anomalous. 
Now suppose that we instead observed the following distribution: 24% of successful sign-ins are from city 1, 4$ of successful sign-ins are from each of cities 3-20. In this case, we would not want to class cities 2 - 20 a being anomalous 
_even though they are below the same threshold as in the first scenario_, since this would mean saying that 76% of all sign-ins had an anomalous sign-in location - this would make for a very noisy approach!

Instead, we set thresholds dynamically on a per-feature basis by using basic outlier detection - specifically we set a threshold on $\log(p)$ where this is more than $k$ standard deviations below the mean ($k = 2$ by default, but can be tuned). This is equivalent to standard-scaling the column of log-probabilities before using a static threshold.
(This threshold can also be given an information-theoretic interpretation, since, for example, $-\mathbb{E}[\log(p)]$ is just entropy.)


First we run the anomalous feature detection algorithm described above - this produces a dataframe in which the rows comprise a (failed) sign-in ID and boolean flags for each sign-in property denoting whether or not that property took an anomalous value. 

In [None]:
%%synapse

# Run the anomalous feature detection

# We could use a library such as scikit -learn's Naive Bayes classifier for this (https://scikit-learn.org/stable/modules/naive_bayes.html)
# but using SPark MLlib directly allows Spark to parallelize this far more efficiently (https://spark.apache.org/mllib/)

n_feats = len(ohe_cols)
n_categories = np.array([len(labels) for labels in ordinal_encoder.labelsArray])
category_counts = [np.array(vec) for vec in ohe_df.select([Summarizer.numNonZeros(col(c)) for c in ohe_cols]).collect()[0]]

# We estimate the underlying categorical distribution for each feature (assuming independence of features and using a Dirichlet(alpha) prior)
alpha = 2.0  # smoothing parameter - the concentration parameter from the Dirichlet prior
log_probs = []
for i in range(n_feats):
    feat_category_counts = category_counts[i]
    n_feat_categories = n_categories[i]
    log_probs.append(np.log(feat_category_counts + alpha) - np.log(n_successful + (alpha * n_feat_categories)))

# We call a value anomalous if it is below a certain probability threshold. However, we cannot just use a static threshold -
# e.g. if each "IPPrefix" value only appears once for a given user, then each will have a low probability, but we don't want to class all of the IP prefixes as anomalous
sigmage = 2.0
thresholds = []
for feat_category_counts, feat_log_probs in zip(category_counts, log_probs):
    mean = np.average(feat_log_probs, weights=feat_category_counts)
    variance = np.dot(feat_category_counts, (feat_log_probs - mean) ** 2) / feat_category_counts.sum()
    std = np.sqrt(variance)
    thresholds.append(mean - (sigmage * std))

# For each feature, we can determine whether or not the i-th categorical value is anomalous
anomalous_values_masks = [feat_log_prob < threshold for (feat_log_prob, threshold) in zip(log_probs, thresholds)]

# Since we have already calculated the {value -> is_anomalous_boolean} mapping, we can create a Spark map type to apply this mapping to our dataframe
spark_maps = [F.create_map([F.lit(x) for x in chain(*enumerate(mask.tolist()))]) for mask in anomalous_values_masks]
is_anom_cols = []
for i, enc_col in enumerate(ord_enc_cols):
    new_col_name = 'IsAnom_' + feat_cols[i]
    is_anom_cols.append(new_col_name)
    fail_df = fail_df.withColumn(new_col_name, spark_maps[i][col(enc_col).cast(IntegerType())])

# We only "drop" columns here for the purpose of the output
fail_df.drop(*ord_enc_cols).show(truncate=False)

Now we collapse our binary "IsAnom\_\*" columns into a single column of binary vectors representing which features are anomalous for each failed sign-in. (This restructuring of the data will be more convenient for later analysis.)

In [None]:
%%synapse

# Collapse the binary "is_anom_*" columns into a single column of binary vectors, then convert that column to a 2D numpy array (of shape (n_failed_signins, n_features))
vec_assembler = VectorAssembler(inputCols=is_anom_cols, outputCol="anomalous_feats_mask")
ids, anomalous_feat_masks = np.array(
    vec_assembler
    .transform(fail_df)
    .select("Id", "anomalous_feats_mask")
    .collect()
).T

# Convert Spark sparse vectors to numpy arrays
anomalous_feat_masks = np.array([np.array(sparse_vec, dtype=int) for sparse_vec in anomalous_feat_masks])

# Output a sample of the binary numpy array
print(anomalous_feat_masks[:5])

## Cluster Failed Sign-Ins

The core hypothesis for this detection algorithm is that the distribution of anomalous features looks very different depending on how the sign-in was generated - 
in particular, sign-ins from a password spray campaign in which attackers use tooling to spoof multiple sign-in properties will have a distinctive "fingerprint" of features that are often anomalous together.<br>
For example, suppose that all failed sign-in attempts come from three sources: legitimate user error, pssword spray campaign 1 and password spray campaign 2. For each of these classes, the probability of a given feature being anomalous may look like this:


| Source                | $\mathbb{P}(\text{Country is anomalous})$ | $\mathbb{P}(\text{City is anomalous})$ | $\mathbb{P}(\text{OS is anomalous})$ | $\mathbb{P}(\text{Browser is anomalous})$ | $\mathbb{P}(\text{App Display Name is anomalous})$ | etc. |
| --------------------- | ----------------------------------------- | -------------------------------------- | ------------------------------------ | ----------------------------------------- | -------------------------------------------------- | ---- |
| Legitimate user error | 0.02                                      | 0.1                                    | 0.01                                 | 0.2                                       | 0.25                                               | ...  |
| PW Spray 1            | **_0.7_**                                 | **_0.95_**                             | **_0.6_**                            | **_0.9_**                                 | **_0.85_**                                         | ...  |
| PW Spray 2            | 0.1                                       | **_0.8_**                              | 0.1                                  | **_0.6_**                                 | **_0.8_**                                          | ...  |


From the hypothetical probabilities in the table, we can see that, for each class of sign-ins, the set of features which are usually anomalous forms a fingerprint for the class.

Obviously, in practice, the sources of sign-ins are _latent_ variables - i.e. they cannot be observed directly. Instead, we work backwords from our dataset of failed sign-ins and associated anomalous features to try to detect the latent classes and associated probabilities for each feature taking an anomalous value. 
From our hypothesis, we hope that, if a password spray campaign is present in our data, it will correspond to one of the detected clusters of failed sign-ins.

Mathematically, we do this by modelling our dataset as being generated from a [Bernouli mixture model](https://en.wikipedia.org/wiki/Latent_class_model). We then perform [variational Bayesian inference](https://en.wikipedia.org/wiki/Variational_Bayesian_methods) to try to detect the presence of latent classes.

In the next cell, We use the `bayespy` Python package to set up the Bernoulli mixture model and run variational Bayesian inference - see [Bernoulli mixture model — BayesPy Documentation](https://www.bayespy.org/examples/bmm.html).

> **Notes:** 
> - Set the number of clusters to look for. The true number of groups is unknown to us, so we use an upper bound for the number of clusters we expect to be present (10 is a resonable number to start with) - the algorithm may assign 0 weight to some clusters if this is too large.
> - This step is not deterministic - rerunning may give slightly different clusterings! If this causes issues, we can simply re-run the variational Bayesian inference multiple times and select the model with the highest ELBO value.

In [None]:
%%synapse

# We will create a Bernoulli mixture model. The true number of groups is unknown to us, so we use a large enough number of clusters.

# Here, Z defines the group assignments and P, the anomalous feature probability patterns for each group.

n_clusters = 10  # 10 is a reasonable number to start with - this is the maximum number of cluster the algorithm will be allowed to find
max_iterations = 5000  # Limit the number of iterations of the variational Bayesian inference algorithm

# We use the categorical distribution for the group assignments and give the group assignment probabilities an uninformative Dirichlet prior (using a small concentration parameter helps avoid learning spurious classes)
R = Dirichlet(n_clusters * [1e-5], name='R')
Z = Categorical(R, plates=(n_failed, 1), name='Z')

# Each group has a probability of a yes answer for each question. These probabilities are given beta priors (the beta distribution is the conjugate prior for the Bernoulli distribution)
P = Beta([0.5, 0.5], plates=(n_feats, n_clusters), name='P')

# This is the overall mixture model created from the components defined above
X = Mixture(Z, Bernoulli, P)

# This is the variational Bayesian inference class object
Q = VB(Z, R, X, P)

# Perform the inference
P.initialize_from_random()
X.observe(anomalous_feat_masks)
Q.update(repeat=max_iterations, verbose=True)

### Visualize Clusters

We use [Hinton diagrams](https://scipy-cookbook.readthedocs.io/items/Matplotlib_HintonDiagrams.html) to visually represent the learned clusters. Areas of filled squares represent probabilities (and non-filled squared are used to show uncertainty).

The first digram shows the probabilities that a randomly selected failed sign-in will be assigned to that cluster by our model (the areas of the squares are proportional to the cluster assignment probabilities).

In the second diagram. columns represent clusters and rows represent features, so, for example, a large white square in the 2nd column, 4th row would indicate that, failed logins in cluster #2 are likely to have an unusual value for feature #4.


In [None]:
%%synapse

# Plot Hinton diagrams

# Number of clusters that the model has learned
n_clusters_learned = np.count_nonzero(np.exp(R.u))
print(f'{n_clusters_learned} non-empty clusters learned.\n')

bpplt.hinton(R)
print(f'"Size" of each of the {n_clusters} clusters', '(The areas of the squares are proportional to the cluster assignment probabilities)', sep='\n')
bpplt.pyplot.show()
print('\n')
# Use this to retrieve the exact cluster assignment probabilities:
# np.exp(R.u)

print(
    f'Probability that a feature takes an anomalous value per cluster',
    f'(Columns represent clusters and rows represent features - e.g., a large white square in the 2nd column, 4th row would indicate that, failed logins in cluster 2 are likely to have an unusual value for "{feat_cols[3]}" (feature 4))',
    sep='\n'
)
print('Features (rows):', feat_cols)
bpplt.hinton(P)
bpplt.pyplot.show()
# Use this to retrieve the exact probablities:
# np.exp(P.u)

## Prune Clusters

We first use our learned model to assign each failed sign-in to a cluster along with the associated probability of the sign-in belonging to the cluster.

In [None]:
%%synapse

# Append columns to the dataframe of failed logins diving the assigned cluster number and associated confidence (probability)

# Extract the cluster assignment probability mass functions for each failed sign-in
# `assignment_pmfs` is an array of shape (n_failed_signins, n_clusters)
assignment_pmfs = Z.u[0][:, 0, :]

# The assigned cluster for each failed sign-in is the one with the greatest probability
assigned_clusters = assignment_pmfs.argmax(axis=1).tolist()  # clusters are indexed from 0 to (n_clusters - 1)
assignment_probs = assignment_pmfs.max(axis=1).tolist()

# Create a dataframe of (Signin ID, Assigned Cluster, Cluster Assignment Probability) rows
assignments_df = spark.createDataFrame(zip(ids, assigned_clusters, assignment_probs), ('Id', 'cluster_id', 'assignment_probability'))

# Join the cluster assignment dataframe back to the rest of the sign in data
fail_df = fail_df.join(assignments_df, on='Id', how='inner')
fail_df.drop(*ord_enc_cols).show(5)

Now we do some pruning of the learned clusters to remove those which are unlikely to represent the type of password spray activity we are looking for.

First we set a confidence threshold to prune failed logins included in each cluster **(intra-cluster pruning)**. We then prune clusters by

- Setting a minimum size for clusters of interest
- Setting a minimum threshold on the number of features consistently taking anomalous values within a cluster

> **Note:** The thresholds to use will depend very much on the data on which the algorithm is being run; start low, and increase the thresholds if results are too noisy.

In [None]:
%%synapse

# Determine how many features consistently take anomalous values within each cluster
p = np.exp(P.u)[0, ..., 0]  # shape = (n_feats, n_clusters)
consistently_anomalous_threshold = 0.4
n_anomalous_features_per_cluster = (p > consistently_anomalous_threshold).sum(axis=0)

# Prune clusters which do not have enough anomalous features to look like the type of password spray activity for which we are searching
cluster_min_anomalous_feats = 2
clusters_to_keep = (n_anomalous_features_per_cluster >= cluster_min_anomalous_feats).nonzero()[0].tolist()

# Prune points within clusters and then remove clusters that are too small
cluster_min_confidence_threshold = 0.4
min_avg_attempts_per_day = 0
cluster_min_size = min_avg_attempts_per_day * lookback_days

clusters_df = (
    fail_df
    .filter(col('cluster_id').isin(clusters_to_keep))
    .filter(col('assignment_probability') >= cluster_min_confidence_threshold)
    .groupby('cluster_id')
    .agg(F.count(col('Id')).alias('cluster_size'))
    .filter(col('cluster_size') >= cluster_min_size)
)

# We now have our candidate low and slow clusters! We can filter the failed sign-in logs
candidate_pw_spray_clusters = [row[0] for row in clusters_df.select(col('cluster_id')).collect()]
low_and_slow_candidates = fail_df.filter(col('cluster_id').isin(candidate_pw_spray_clusters))
print('Total number of candidate failed sign in attempts:', low_and_slow_candidates.count())

clusters_df.show()

We now have our candidate low and slow password spray campaigns! These campaigns/clusters will be further pruned when we use `msticpy` for specific analysis and TI enrichment of these campaigns.

## Export Results to ADLS

At this point, we have all the data that we need fromt the big data analytics and ML steps using Spark, and can write the data back to the data lake before stopping the Spark session to minimize compute cost.  
THis will allow the data to be read into the AML notebook context where we further erich, analyze and visualize these outputs before creating writing back to Sentinel.

The following outputs will be persisted:

1. **Full _SigninLogs_ rows for candidate password spray sign-ins**
2. **Aggregated sign-in timestamps** - these will be used for some timeseries vizualizations using `msticpy`
3. **Aggregated sign-in locations** - these will be used for geo-plotting using `msticpy`
4. **Various "baseline" statistics** - these will be used as part of reporting back to Sentinel
4. **Sample of successful sign-ins** - this will be used in MSTICPy vizualizations and as part of reporting back to Sentinel

Each of the above outputs will be saved as a single json file in ADLS.

### Export Candidate Password Sprays

In [None]:
%%synapse

from pathlib import PurePosixPath

base_dir_name = 'low_and_slow_pw_spray_ml'  # optionally add a suffix if you want to avoid overwriting results from a previous run
base_path = PurePosixPath(adls_path, base_dir_name)

# Candidate low and slow password sprays
low_and_slow_candidates_path = base_path/'low_and_slow_candidates'
ohe_df.coalesce(1).write.format('json').save(low_and_slow_candidates_path)

### Export Aggregated Data/Statistics

In [None]:
# Aggregated sign-in timestamps
timestamp_df = (
    success_df
    .select(F.date_trunc('hour', 'TimeGenerated').alias('TimeGenerated'))
    .groupBy('TimeGenerated')
    .count()
    .orderBy('TimeGenerated')
)
signin_day_df = (
    success_df
    .select(F.dayofweek('TimeGenerated').alias('day_of_week'))
    .groupBy('hour')
    .count()
    .orderBy('hour')
)
signin_hour_df = (
    success_df
    .select(F.hour('TimeGenerated').alias('hour'))
    .groupBy('hour')
    .count()
    .orderBy('hour')
)

# Most common successful sign-in locations (for geoplot)
locations_df = (
    success_df
    .groupBy(['latitude', 'longitude'])
    .count()
    .na.drop()
    .orderBy('count', ascending=False)
    .head(5000)
)

# Entropy (not normalized) per feature (this is a measure of "variability")
feat_entropies = dict(zip(feat_cols, entropy(category_counts, axis=1)))


In [None]:
# Persist aggregated data

from pathlib import PurePosixPath

dataframe_output_dirs = [
    # (dataframe, output directory name)
    (timestamp_df, 'signin_times'),
    (signin_day_df, 'signin_days'),
    (signin_hour_df, 'signin_hours'),
    (locations_df, 'top_locations'),
]

for data, dir in dataframe_output_dirs:
    path = base_path/dir
    data.coalesce(1).write.format('json').save(path)

In [None]:
import json
with open(str(base_path/'entropy_per_feature.json')) as f:
    json.dump(feat_entropies)

### Export Baseline Sample

In [None]:
%%synapse

# Retain a sample of successful logins to use as a baseline for
baseline_sample_size = 10000
sample_fraction = len(df) // baseline_sample_size
df.sample(fraction=sample_fraction)

baseline_sample_path = base_path/'successful_login_baseline_sample'
low_and_slow_candidates.coalesce(1).write.format('json').save(baseline_sample_path)

## Stop Spark Session

In [None]:
%synapse stop

# 3. Analyze Clusters on AML Compute

## Export results from ADLS to local filesystem

In [None]:
def initialize_storage_account(storage_account_name, storage_account_key):
    try:
        global service_client
        service_client = DataLakeServiceClient(
            account_url='{}://{}.dfs.core.windows.net'.format(
                'https', storage_account_name
            ),
            credential=storage_account_key,
        )
    except Exception as e:
        print(e)


def list_directory_contents(container_name, input_path, file_type):
    try:
        file_system_client = service_client.get_file_system_client(
            file_system=container_name
        )
        paths = file_system_client.get_paths(path=input_path)

        pathlist = []
        for path in paths:
            pathlist.append(path.name) if path.name.endswith(file_type) else pathlist
        return pathlist

    except Exception as e:
        print(e)


def download_file_from_directory(container_name, input_path, input_file):
    try:
        file_system_client = service_client.get_file_system_client(
            file_system=container_name
        )
        directory_client = file_system_client.get_directory_client(input_path)
        local_file = open('output.json', 'wb')
        file_client = directory_client.get_file_client(input_file)
        download = file_client.download_file()
        downloaded_bytes = download.readall()
        local_file.write(downloaded_bytes)
        local_file.close()

    except Exception as e:
        print(e)


def json_normalize(input_file, output_file):
    resultList = []
    with open(input_file) as f:
        for jsonObj in f:
            resultDict = json.loads(jsonObj)
            resultList.append(resultDict)

    with open(output_file, 'w') as write_file:
        json.dump(resultList, write_file)

### Load the Data from ADLS

In below sections, we will provide input details about ADLS account ad then use functions to connect , list contents and download results locally.

If you need help in locating input details, follow below steps
- Go to the https://web.azuresynapse.net and sign in to your workspace.
- In Synapse Studio, click Data, select the Linked tab, and select the container under Azure Data Lake Storage Gen2.
- Navigate to folder from the container, right click and select Properies.
- Copy ABFSS path , extact the details and map to the input fields


You can check [View account access keys](https://docs.microsoft.com/azure/storage/common/storage-account-keys-manage?tabs=azure-portal#view-account-access-keys) doc to find and retrieve your storage account keys for ADLS account.

<p style="border: solid; padding: 5pt; color: black; background-color: #AA4000">
<b>Warning</b>: If you are storing secrets such as storage account keys in the notebook you should<br>
probably opt to store either into msticpyconfig file on the compute instance or use<br.>
Azure Key Vault to store the secrets.<br>
Read more about using KeyVault
<a href=https://msticpy.readthedocs.io/en/latest/getting_started/msticpyconfig.html#specifying-secrets-as-key-vault-secrets >in the MSTICPY docs</a>
</p>

In [None]:
# Primary storage info
account_name = '<storage account name>'  # fill in your primary account name
container_name = '<container name>'  # fill in your container name
subscription_id = '<subscription id>'  # fill in your subscription id
resource_group = '<resource-group>'  # fill in your resource group for your log analytics workspace !!NOT: !!for ADLS account!!
workspace_name = '<Microsoft Sentinel/Log Analytics workspace name>'  # fill in your log analytics workspace name

input_path = (
    f'WorkspaceResourceId=/'
    f'subscriptions/{subscription_id}/'
    f'resourcegroups/{resource_group.lower()}/'
    f'providers/microsoft.operationalinsights/'
    f'workspaces/{workspace_name.lower()}/'
)
adls_path = f'abfss://{container_name}@{account_name}.dfs.core.windows.net/{input_path}/{workspace_name}'
adls_path = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/"
dir_name = 'low_and_slow_pw_spray_ml'
# In production, make sure any keys are stored and retrieved securely (e.g. using Azure Key Vault) - don't store keys in plain text!
account_key = '<storage-account-key>'  # replace your storage account key

In [None]:
# new_path = input_path + dir_name
import json

initialize_storage_account(account_name, account_key)
pathlist = list_directory_contents(container_name, dir_name, "json")

for path in pathlist:
    path = PurePosixPath(path)
    download_file_from_directory(container_name, path.parent, path.name)

baseline_times = pd.read_json('signin_times/output.json')
baseline_days = pd.read_json('signin_days/output.json')
baseline_hours = pd.read_json('signin_hours/output.json')
top_locations_df = pd.read_json('top_locations/output.json')
baseline_entropies = pd.read_json('entropy_per_feature/output.json')
clusters_df = pd.read_json('clusters/output.json')
success_sample_df = pd.read_json('low_and_slow_candidates/output.json')
cluster_details = pd.read_json('cluster_details/output.json')
n_clusters = len(cluster_details)

## Analyze Clusters Using MSTICPy

Having used using big data analytics and ML to slim reduce our SigninLogs data to a handful of candidate low and slow password spray clusters, we are now ready to investigate each of the generated clusters

The two broad questions to try to answer at this stage are:
- Do the clusters represent likely (low and slow) password spray activity?
- Do the clusters exhibit any distinctive properties that will aid with remediation and/or attribution? (E.g. Do the sign-ins all use an unusual user agent that could be blocked?)<br>
  This information can be added to incidents written back to Sentinel.

In the following section, we use [MSTICPy](https://msticpy.readthedocs.io/en/latest/index.html)'s built-in security analytics tools to better understand each cluster. We only present a few general techniques here - your investigation may lead you down a different route.

### Vizualize Clusters 

The candidate low and slow password spray clusters have been generated based on the mix of features which are typically anomalous. We can plot charts for each candidate cluster showing, for each sign-in property/feature

1. The number of sign-ins where that property/feature is anomalous
2. The "variability" of that property/feature

Together, these two properties "fingerprint" each cluster and can give inform the direction further hunting. For example, suppose a cluster is characterised by its sign-ins having anomalous "ClientAppUsed" and "Location" peroperties, 
and suppose that the "variability" for yhese properties is low within the cluster. This indicates, that a relatively small number of anomalous client apps / sign-in locations are being used, which means that there is potential to write a rule-based detection on these static anomalous values.

In [None]:
is_anom_cols = [col for col in clusters_df.columns if col.startswith('IsAnom_')]
clusters_grouped = clusters_df.groupby('cluster_id')
num_anom_per_cluster = clusters_grouped[is_anom_cols].sum()
num_anom_per_cluster['label'] = '# Anomalous'
entropy_per_cluster = clusters_grouped.apply(lambda g: g.apply(entropy, axis=0)) / baseline_entropies
entropy_per_cluster['label'] = 'Entropy (normalized)'

bar_charts = []
for i in range(n_clusters):
    bar_chart_data = pd.concat(
        num_anom_per_cluster[num_anom_per_cluster.cluster_id == i],
        entropy_per_cluster[entropy_per_cluster.cluster_id == i],
    )
    bar_charts.append(bar_chart_data)

# Plot the i-th chart
i = 0
bar_charts[0].    hvplot.bar(stacked=False, height=500)

### Times Series Analysis

A good indicator of low and slow password spray-like activity is regular patterns in the times of the candidate sign-ins. Although threat actors add some random noise to the schedule on which password spray sign in attempts occur, when viewed as a whole, there is often still a distinctive uniformity to the time series of sign in attempts as attacker endeavour to avoid lock-out.

In order to test sign-ins in each of our candiadate clusters for "uniform spread" over time, we perform a [Kolmogorov-Smirnov goodness-of-fit test](https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.ks_1samp.html) against a uniform distribution. The output value will be between 0 and 1, with values closer to zero indicating that sign-in times are unlikely to generated from a uniform distribution.

In [None]:
# Output is between 0 and 1; roughly speaking, larger means more uniform.

def uniformity_metric(times: pd.Series):
    # Depending on your data, you might want to remove outliers first!
    max_time = times.max().to_numpy()
    min_time = times.min().to_numpy()
    cdf = lambda x: (x - min_time) / (max_time - min_time)
    _, p = ks_1samp(times, cdf)
    return p

# Compute uniformity checks
for i in range(n_clusters):
    times = clusters_df[clusters_df.cluster_id == i].TimeGenerated.dropna()
    p = uniformity_metric(times)
    print(f"Cluster {i} uniformity: {p}")

Similarly, normal sign-in activity will exhibit distinctive day/week seasonality which we can check for in our candidate low and slow password spray clusters.

In [None]:
# Output is between 0 and 1; roughly speaking, larger means that sign-ins from the candidate cluster follow a similar pattern to those from our baseline of successful sign-ins.

for i in range(n_clusters):
    times = clusters_df[clusters_df.cluster_id == i].TimeGenerated.dropna()

    # Test day-of-week seasonality
    days = times.dt.dayofweek
    _, days_distribution_similarity = ks_2samp(baseline_days, days)
    print(f"Likelihood that sign-in days from cluster {i} have a distribution matching the baseline: {days_distribution_similarity}")

    # Test hour-of-day seasonality
    hours = times.dt.hour
    _, hours_distribution_similarity = ks_2samp(baseline_hours, hours)
    print(f"Likelihood that sign-in hours from cluster {i} have a distribution matching the baseline: {hours_distribution_similarity}")

#### Timeseries Plots

The above statistics does not capture the many different patterns we might see in attacker behaviour (especially as attackers use increasingly sophisticated techniques to avoid detection).  
A time-plot vizualization can highlight patterns not captured by analytics.

Common things to look out for:

- Sign-in attempts spread out fairly uniformly over time. 
- Lack of day/week/month seasonal patterns
- Sign-ins on particularly unusual days (e.g. public holidays)

You can also modify the plot below to show just the sign-in day-of-week or hour-of-day.

In [None]:
from msticpy.vis.timeline import display_timeline_values

min_time_generated = (success_sample_df.TimeGenerated.min()).tz_localize(None)
max_time_generated = (success_sample_df.TimeGenerated.max()).tz_localize(None)

baseline_times["cluster_id"] = "BASELINE"

time_plot_data = pd.concat([clusters_df[["TimeGenerated", "cluster_id"]], baseline_times], axis=0)
time_plot_data = time_plot_data.groupby([pd.Grouper(key="TimeGenerated", freq="2T"), "cluster_id"], as_index=False).size()
display_timeline_values(
    data=time_plot_data,
    y="size",
    group_by="cluster_id",
)

### Sign-In Location Analysis

We can use msticpy's visualisation libraries to plot locations on a map. This can be particularily useful when looking at the distribution of anomalous sign in attempts.

<div style="border: solid; padding: 5pt"><b>Note:</b>
    If your logs source does not include GeoIP data, you can use msticpy's geolocation capabilities using the maxmind database. You will need a maxmind API key to download the database.
    <br>
    You may see the GeoLite driver downloading its database the first time you run this.
</div>
<details>
    <summary>Learn more about MSTICPy GeoIP providers...</summary>
    <p>
    <a href=https://msticpy.readthedocs.io/en/latest/data_acquisition/GeoIPLookups.html >MSTICPy GeoIP Providers</a>
    </p>
</details>
<br>

We use two plots to answer two questions in this section:

1. Are sign-in attempts generally from unusual locations as compared to the baseline successful sign-ins?
2. Can we learn anything more specific about where sign-ins for each clusters are coming from?

In [None]:
# Plot the locations of sign-ins per cluster against the baseline sample of successful sign-ins

common_args = dict(x="longitude", y="latitude", height=500, width=900)
display(
    clusters_df.hvplot.scatter(
        **common_args,
        title="Sign-in Locations by Cluster/Baseline",
        color="orange",
        by="cluster_id",
        alpha=0.3
    )
    * top_locations_df.hvplot.scatter(**common_args, color="green", alpha=0.3, size=10)
)
md("Successful sign-in locations in green.", "bold")
md("Note: Fainter dots indicates fewer logons, brighter color indicates multiple logons.")

In [None]:
# Using MSTICPy's Folium Map integration for interactive geo-plotting capabilities, we can dig a bit further into the sign-in locations per cluster

clusters_df.mp_plot.folium_map(
    lat_column="latitude",
    long_column="longitude",
    layer_column='cluster_id'
    zoom_start=1,
)

### Threat Intelligence Enrichment

In this step, we can perform threatintel lookup using msticpy and open source TI providers such as IBM Xforce, VirusTotal, Greynoise etc. 
The below examples show TI lookups on single IP as well as a bulk lookup on all ips using IBM Xforce TI Provider. 
<br>You will need to register with IBM Xforce and enter API keys into `mstipyconfig.yaml`

<details>
    <summary>Learn more...</summary>
    <p>
    </p>
    <ul>
        <li>More details are shown in the <i>A Tour of Cybersec notebook features</i> notebook</li>
        <li><a href=https://msticpy.readthedocs.io/en/latest/data_acquisition/TIProviders.html >Threat Intel Lookups in MSTICPy</a></li>
        <li> To learn more about adding TI sources, see the TI Provider setup in the <i>A Getting Started Guide For Microsoft Sentinel ML Notebooks</i> notebook
    </ul>
</details>
<br>

In [None]:
from msticpy import TILookup

ti_lookup = TILookup()
# Perform lookup on a single IOC
result = ti_lookup.lookup_ioc(observable="52.183.120.194", providers=["XForce"])
ti_lookup.result_to_df(result)

### Whois registration enrichment
In this step, we can perform whois lokup on all public destination ips and populate additional information such as ASN. You can use this output to further filter known ASNs from the results.

In [None]:
from msticpy.context.ip_utils import get_whois_info

num_ips = len(df["DestinationIP"].unique())
print(f"Performing WhoIs lookups for {num_ips} IPs ", end="")
df["DestASN"] = df.apply(lambda x: get_whois_info(x.DestinationIP, True), axis=1)
df["DestASNFull"] = df.apply(lambda x: x.DestASN[1], axis=1)
df["DestASN"] = df.apply(lambda x: x.DestASN[0], axis=1)

#Display results
df.head()

### Other

There is a lot more data available in the SigninLogs table that we haven't looked at. Using the MSTICPy `DataViewer` control below, you can interactively inspect your raw data to see if anything stands out.

**Every security investigation is different, and will depend heavily on your data and environment.** There are many more tools (including those in MSTICPy) that you may wish to use to further your investigation. Take a look at our [guided hunting blog post]() and the [MSTICPy notebook examples](https://msticpy.readthedocs.io/en/latest/notebooksamples.html).

In [None]:
from msticpy.vis.data_viewer import DataViewer
DataViewer(clusters_df)

# 4. Create Sentinel Incidents

To support security analysts to respond to these candidate password spray events, we create custom incidents in the Sentinel workspace.

MSTICPy has built-in support for reading from, and writing to, Microsoft Sentinel. Using the provided API, we first create a single incident to indicating potential low and slow password spray activity. 
We then add comments to the incident giving details of the each candidate campaign, including details of machines affected. This makes it easy for security analysts to make use of the outputs of this ML notebook to take further action as appropriate.

You may wish to modify the structure of the incidents written back to Sentinel based on your team's workflow.

In [None]:
from msticpy.context.azure.sentinel_core import MicrosoftSentinel

sentinel = MicrosoftSentinel(
    sub_id="1fc4ff85-c4cd-48f5-a9e4-165751ccc023",
    res_grp="soc-mstic-play",
    ws_name="dummyloganalyticsws",
)
sentinel.connect()

In [None]:
for cluster_df in cluster_details.values():
    sentinel.create_incident(
        title="Potential Low and Slow Password Spray Activity",
        severity="Low",
        first_activity_time=cluster_df.TimeGenerated.min(),
        last_activity_time=cluster_df.TimeGenerated.max(),
        description=cluster_df.to_string()
    )

In [None]:
incidents = sentinel.list_incidents()
created_incident_id = incidents[
    incidents["properties.title"] == "Demo - Potential password spray campaign"
].sort_values(by="properties.incidentNumber", ascending=False).name[0]

html_data = cluster_df.loc[cluster_df.index != "Id"].to_html(header=False)
sentinel.post_comment(
    incident_id=created_incident_id,
    comment=html_data,
)

# Conclusion

Due to the nature of low and slow password sprays, we needed to start our hunting on very large datasets of historical sign in logs. The sheer scale of data made Spark a great tool to allow us to easily perform distributed data operations at scale.  
We then executed several analytical queries to surface series of failed sign in attempts with high IP volatility based on known patterns used by attackers.
In order to analyze this data further, we use msticpy's data enrichment and visualization capabilities

Analysts can perform further investigation and can then create incidents in Microsoft Sentinel and track investigations in Sentinel. 
Details of possible next steps to take are in the accompanying Microsoft Tech Community blog post: [Microsoft Sentinel Blog - Microsoft Tech Community](https://techcommunity.microsoft.com/t5/microsoft-sentinel-blog/bg-p/MicrosoftSentinelBlog).  
For more information on hunting and incident response playbooks for password sprays, please see [Password spray investigation | Microsoft Docs](https://docs.microsoft.com/security/compass/incident-response-playbook-password-spray).